elasticsearch之Document APIs【Reindex API】

2024-08-27 16:18

本文主要是介绍elasticsearch之Document APIs【Reindex API】,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

环境

elasticsearch:5.5

Reindex API

重要

Reindex不会尝试设置目标索引(即数据库)。它不会复制源索引的设置。你应该在运行_reindex操作之前设置目标索引,包括设置mappings分片的数量,副本等待。

_reindex 的最基本形式是把一个索引中的文档复制到另一个索引中。下面的例子是将twitter索引(即:数据库)中的文档复制到new_twitter索引中:

POST _reindex
{"source": {"index": "twitter"},"dest": {"index": "new_twitter"}
}

返回的结果如下:

{"took" : 147,"timed_out": false,"created": 120,"updated": 0,"deleted": 0,"batches": 1,"version_conflicts": 0,"noops": 0,"retries": {"bulk": 0,"search": 0},"throttled_millis": 0,"requests_per_second": -1.0,"throttled_until_millis": 0,"total": 120,"failures" : [ ]
}

就像_update_by_query_reindex会从源索引中获取快照,然而目标索引必须是不同的索引,所以版本冲突是不可能的。dest元素可以像index api一样配置乐观并发锁控制。只省略version_type或者设置其为internal将会造成elasticsearch盲目的将文档转储到目标索引中,还会覆盖相同的typeid

POST _reindex
{"source": {"index": "twitter"},"dest": {"index": "new_twitter","version_type": "internal"}
}

设置version_typeexternal将会造成elasticsearch会保留源的版本号,创建任何缺失的文档,并且在目标索引中更新旧版本的文档,而不是在源索引中做这些:

POST _reindex
{"source": {"index": "twitter"},"dest": {"index": "new_twitter","version_type": "external"}
}

设置op_typecreate将会造成_reindex操作只会去创建在目标索引中缺失的文档。所有已存在的文档将会引起冲突:

POST _reindex
{"source": {"index": "twitter"},"dest": {"index": "new_twitter","op_type": "create"}
}

默认情况下,版本冲突会中止_reindex进程,但是你可以在请求体中设置"conflicts": "proceed"来进行计数(即有多少个版本冲突)。

POST _reindex
{"conflicts": "proceed","source": {"index": "twitter"},"dest": {"index": "new_twitter","op_type": "create"}
}

你可以通过给source添加type或者添加查询条件来对文档进行限制。
这将只复制tweet's中的kimchynew_twitter中:

POST _reindex
{"source": {"index": "twitter","type": "tweet","query": {"term": {"user": "kimchy"}}},"dest": {"index": "new_twitter"}
}

indextypesource中都可以是列表,其允许你在一个请求中从多个source中进行复制。下面是从在twitterblog 索引中的tweetpost类型中复制文档。其也包含在twitter索引中的post类型和在blog索引中的tweet类型。如果你想更具体些,你需要使用query。其不会处理id冲突。目标索引就会有效的保留,但是不能预测哪个文档应该保留下来,因为顺序迭代没有定义好顺序。

POST _reindex
{"source": {"index": ["twitter", "blog"],"type": ["tweet", "post"]},"dest": {"index": "all_together"}
}

也可以通过设置size来限制处理文档的数量。下面仅仅从twitter中复制一个文档到new_twitter中:

POST _reindex
{"size": 1,"source": {"index": "twitter"},"dest": {"index": "new_twitter"}
}

如果你想从twitter索引中获取一系列特定的文档,你需要进行排序。
排序会使得scroll降低效率,但是在某些情况下,这是值得的。
如果可以的话,更应该偏向于选择通过sizesort进行查询。
下面例子是从twitter中复制10000文档到new_twitter中:

POST _reindex
{"size": 10000,"source": {"index": "twitter","sort": { "date": "desc" }},"dest": {"index": "new_twitter"}
}

source 部分支持在search request中支持的所有元素。例如:只想从原始文档中reindex一部分字段,可以使用source进行过滤:

POST _reindex
{"source": {"index": "twitter","_source": ["user", "tweet"]},"dest": {"index": "new_twitter"}
}

_update_by_query一样、_reindex支持脚本来修改文档。不同的是,_update_by_query,允许脚本修改文档的元数据。下面例子修改了源文档的版本:

POST _reindex
{"source": {"index": "twitter"},"dest": {"index": "new_twitter","version_type": "external"},"script": {"inline": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}","lang": "painless"}
}

就像_update_by_query,你可以设置ctx.op来改变在目标索引中执行的操作:

参数描述
noop如果你的脚本决定不必索引目标索引中的文档,那么可以设置ctx.op="noop"。在response body中的noop计数器中没有操作将会被报告
delete如果你的脚本判定必须从目标索引中删除文档,那么可以设置ctx.op = "delete"。删除的(文档)在response bodydeleted计数器中将会被报告

ctx.op设置为任何其他参数都是错误的。在ctx设置任何其他字段都是错误。

设置_versionnull或者从ctx map中清除, 就像在indexing请求中
发送没有version的请求。其会造成:无论目标版本或者在_reindex请求中你使用的version type,都会覆盖目标索引(即数据库)中的文档。

默认情况下,如果_reindex发现一个有路由的文档,接着该路由会被保留,除非通过脚本改变它。你可以在dest请求上设置routing来改变:

参数描述
keepbulk请求(每个匹配到的子项)设置匹配到的路由。默认
discard将发送的bulk请求(每个匹配到的子项)的路由设置为null
=<some text>将发送的bulk请求(每个匹配到的子项)的路由设置为=之后的所有文本。

例如:你可以使用下面这个请求从source索引(即数据库)中的company名称为cat的所有文档都复制到路由为catdest索引(即数据库)中。

POST _reindex
{"source": {"index": "source","query": {"match": {"company": "cat"}}},"dest": {"index": "dest","routing": "=cat"}
}

_reindex 默认使用scroll的批次是1000。你在source元素中可以使用size字段来修改批次的大小:

POST _reindex
{"source": {"index": "source","size": 100},"dest": {"index": "dest","routing": "=cat"}
}

通过指定pipelineReindex也可以使用Ingest Node特性,如下:

POST _reindex
{"source": {"index": "source"},"dest": {"index": "dest","pipeline": "some_ingest_pipeline"}
}

Reindex from Remote

Reindex 支持从一个远程的elasticsearch集群来重新索引:

POST _reindex
{"source": {"remote": {"host": "http://otherhost:9200","username": "user","password": "pass"},"index": "source","query": {"match": {"test": "data"}}},"dest": {"index": "dest"}
}

host 参数必须包含的格式:hostport(例如:https://otherhost:9200)。usernamepasswork参数是可选的,当它们存在时,reindex将把它们作为连接远程elasticsearch节点基本认证信息。当使用基础认证或者发送密码存文本请确保使用https

elasticsearch.yaml中使用reindex.remote.whitelist属性来明确远程主机的白名单。其可以设置一个由逗号分隔的远程主机列表(比如:otherhost:9200, another:9200, 127.0.10.*:9200, localhost:*)。
通过白名单格式(Scheme)可以被忽略 —- 只使用hostport

这个特性应该适用于你可能找到的任何版本的elasticsearch远程集群。
这也就允许你通过reindexing从旧版本的集群中将任何版本的elasticsearch升级到最新版本。

要启用发送给旧版本的elasticsearch的查询,query参数将直接发送到远程主机,无需验证和修改。

注意

从远程主机reindexing是不支持manualautomatic slicing

从远程服务器上reindexing使用了堆栈缓存区,默认最大值是100mb
如果远程索引里的文档非常巨大,那么你需要使用小一点的批次大小。下例把批次大小为10,是非常非常小啦:

POST _reindex
{"source": {"remote": {"host": "http://otherhost:9200"},"index": "source","size": 10,"query": {"match": {"test": "data"}}},"dest": {"index": "dest"}
}

也可以用socket_timeout字段来设置在远程连接中socket读取超时时间,和使用connect_timeout字段来设置连接超时时间。这两个字段默认是30秒
下面这个例子是设置socket read超时时间为1分钟、连接超时时间为10秒

POST _reindex
{"source": {"remote": {"host": "http://otherhost:9200","socket_timeout": "1m","connect_timeout": "10s"},"index": "source","query": {"match": {"test": "data"}}},"dest": {"index": "dest"}
}

URL Parameters

除了标准参数像prettyReindex API也支持refreshwait_for_completionwait_for_active_shardstimeoutrequests_per_second

发送带refresh URL参数请求将会造成所有请求写入的索引(即:数据库)将会被刷新。这不同于Index APIrefresh参数,其只是造成收到新数据的分片进行刷新。

如果请求里包含wait_for_completion=false,接着elasticsearch将会进行预检,启动请求,和返回一个可以被用于Tasks APIstask,其可以被取消或者获取task的状态。elasticsearch也将会创建一个记录这个task的文档,该文档的路径是.tasks/task/${taskId}。你可以根据情况去考虑是否删除。当你删除后,elasticsearch会回收其空间。

wait_for_active_shards 用于控制在处理reindexing之前,副本分片必须有多少个存活。详情可以看这里

timeout 用于控制每个写请求在等待不可用分片变为可用需要等待多久时间。Both work exactly how they work in the Bulk API.

requests_per_second 可以设置任何正的十进制数字(1.4, 6, 1000, 等等)和 节流率:reindex通过每批次等待时间来分配批次的索引操作。可以通过设置requests_per_second-1来关闭这个限制。

通过批次之间的等待来实现这种限制,以便reindex使用内部的scroll可以传入一个差额的超时时间。这个差额时间是批处理大小除以requests_per_second再减去写入时间。批处理大小默认是1000,如果requests_per_second设置为500

target_time = 1000 / 500 per second = 2 seconds
wait_time = target_time - write_time = 2 seconds - .5 seconds = 1.5 seconds

因为批处理是发送一个_bulk请求,所以大批量批处理将会造成elasticsearch创建许多请求和然后在下个集合开始之前等待一会。这是bursty而不是smooth。默认是-1

Response body

json的响应如下:

{"took" : 639,"updated": 0,"created": 123,"batches": 1,"version_conflicts": 2,"retries": {"bulk": 0,"search": 0}"throttled_millis": 0,"failures" : [ ]
}
字段描述
took从操作开始到结束总共花费的毫秒数
updated文档更新成功的数量
created成功创建文档的数量
batches通过reindex拉取到scroll响应的数量
version_conflictsreindex发送版本冲突的数量
retriesreindex尝试重试的次数。bulkbulk行为重试的次数。searchsearch行为重试的次数
throttled_millis请求符合requests_per_second睡眠的毫秒数
failures数组,包含所有失败的索引。如果不为空,请求会因为失败而中止。可以参考如何预防版本冲突而中止操作

Works with the Task API

你可以使用Task API来提取全部正在运行的reindex请求的状态。

GET _tasks?detailed=true&actions=*reindex

响应如下:

{"nodes" : {"r1A2WoRbTwKZ516z6NEs5A" : {"name" : "r1A2WoR","transport_address" : "127.0.0.1:9300","host" : "127.0.0.1","ip" : "127.0.0.1:9300","attributes" : {"testattr" : "test","portsfile" : "true"},"tasks" : {"r1A2WoRbTwKZ516z6NEs5A:36619" : {"node" : "r1A2WoRbTwKZ516z6NEs5A","id" : 36619,"type" : "transport","action" : "indices:data/write/reindex","status" : {    ①"total" : 6154,"updated" : 3500,"created" : 0,"deleted" : 0,"batches" : 4,"version_conflicts" : 0,"noops" : 0,"retries": {"bulk": 0,"search": 0},"throttled_millis": 0},"description" : ""}}}}
}

这个对象包含实际的状态。其就是json格式,并且重要的是其包含total字段。total:期望执行reindex操作的总数。你可以通过添加updatedcreateddeleted字段来预估进度。当它们的总和等于total字段时,请求将会结束。

你可以直接使用task id来查看任务信息:

GET /_tasks/taskId:1

上面这个api的优点是其与wait_for_completion=false整合来明确的返回已完成task的状态。如果task已完成并且设置了wait_for_completion=false,将返回results或者error字段。wait_for_completion=false这个特性的代价是在.tasks/task/${taskId}中创建一个文档。你可以更新需要删除它。

Works with the Cancel Task API

任何一个reindex都可以使用Task Cancel API进行取消:

POST _tasks/task_id:1/_cancel

可以使用上面的api来发现task_id
取消应该是快速发生但是也有可能花费几分钟。上面任务状态api将会继续列出任务直到其自身被取消。

Rethrottling

使用_rethrottle api可以改变正在运行的reindexrequests_per_second值:

POST _reindex/task_id:1/_rethrottle?requests_per_second=-1

使用上面的api可以找到task_id.
就像在_reindex API设置一样,requests_per_second可以设置为-1来禁用throttling,或者设置任意十进制数像1.7或者12来设置throttle的级别。Rethrottling加速查询会立即生效,但是rethrotting在完成当前批次之后缓慢查询将会生效。这是为了防止scroll超时。

Reindex to change the name of a field

_reindex可以在复制索引时重命名字段。假设你创建如下一个文档:

POST test/test/1?refresh
{"text": "words words","flag": "foo"
}

假设现在你不喜欢flag这个名字,想把它换成tag。可以使用_reindex来实现:

POST _reindex
{"source": {"index": "test"},"dest": {"index": "test2"},"script": {"inline": "ctx._source.tag = ctx._source.remove(\"flag\")"}
}

现在你得到新文档:

GET test2/test/1

如下:

{"found": true,"_id": "1","_index": "test2","_type": "test","_version": 1,"_source": {"text": "words words","tag": "foo"}
}

或者你可以通过tag或是任何你想的进行搜索。

Manual slicing

reindex支持Sliced Scroll,允许你相对容易的手动并行处理:

POST _reindex
{"source": {"index": "twitter","slice": {"id": 0,"max": 2}},"dest": {"index": "new_twitter"}
}
POST _reindex
{"source": {"index": "twitter","slice": {"id": 1,"max": 2}},"dest": {"index": "new_twitter"}
}

你可以校验效果:

GET _refresh
POST new_twitter/_search?size=0&filter_path=hits.total

其结果中会有一个total如下:

{"hits": {"total": 120}
}

Automatic slicing

你也可以使用Sliced Scrollreindex自动并行化:

POST _reindex?slices=5&refresh
{"source": {"index": "twitter"},"dest": {"index": "new_twitter"}
}

你可以校验效果:

POST new_twitter/_search?size=0&filter_path=hits.total

其结果中会有一个total

{"hits": {"total": 120}
}

在上面部分中添加slicesreindex只是将手动处理自动化,创建子请求意味着其会有一些特殊:

  • 你可以在Tasks APIs中看到这些请求,这些子任务是带有slices请求的孩子任务。
  • slices请求获取任务状态时,只会获取已完成slices的状态。
  • 这些子请求像取消rethrottling是独立寻址的;
  • slicesRethrottling请求将会限制未完成子请求的比例。
  • slicesCanceling请求将会取消每个子请求。
  • 由于slices的特性,每个子查询不会得到完全均匀的文档。所有的文档都将会被处理,但是某些slices可能会比其他的大些。期望大切片分配的更均匀。
  • slices请求上使用像requests_per_secondsize参数按比例分配到每个子请求上。结合上面slices分配不均匀的问题,你应该可以得出结论使用带slices_reindex中使用size,可能不会得到正确的文档大小。
  • 每个子请求会从源索引那里得到略有不同的快照,尽管看上去他们获取的时间大致相同。

Picking the number of slices

这里我们有些关于使用slices的一些建议(如果手动并行化的话,那么就是在slice APImax的参数):

  • 不要使用大数字。比如500,会使CPU发生相当多的thrash(大部分时间都用在翻页上)。
  • 从查询性能的角度来看,在源索引中使用分片的倍数是更高效的。
  • 从查询性能的角度来看,在源索引中使用和分片一样的数量是最高效的。
  • 索引的性能应该在可利用资源之间以切片数量线性扩展。
  • 索引(插入)或者查询性能是否主导进程取决于很多因素,像文档重新索引和集群正在重新索引。

Reindex daily indices

你可以使用_reindexPainless组合来reindex daily indices,以把新模板应用到已有的文档上。
假设你有以下文件组成的索引:

PUT metricbeat-2016.05.30/beat/1?refresh
{"system.cpu.idle.pct": 0.908}
PUT metricbeat-2016.05.31/beat/1?refresh
{"system.cpu.idle.pct": 0.105}

metricbeat-*索引新模板早已加载到了elasticsearch,但是其仅仅适用于新创建的索引。Painless可用于重新索引已存在的文档和应用新文档。

下面的脚本从索引名称中 提取日期,并创建一个带-1的新索引。来自metricbeat-2016.05.31的所有数据将重新索引到metricbeat-2016.05.31-1

POST _reindex
{"source": {"index": "metricbeat-*"},"dest": {"index": "metricbeat"},"script": {"lang": "painless","inline": "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'"}
}

之前metricbeat索引中的所有文档现在可以在*-1索引中查询。

GET metricbeat-2016.05.30-1/beat/1
GET metricbeat-2016.05.31-1/beat/1

以前的方法也可以联合change the name of a field使用来只加载已存在的数据到新索引中,如果需要也可以重命名字段。

Extracting a random subset of an index

reindex 也可以随机提取索引中的一个子集:

POST _reindex
{"size": 10,"source": {"index": "twitter","query": {"function_score" : {"query" : { "match_all": {} },"random_score" : {}}},"sort": "_score"    ①},"dest": {"index": "random_twitter"}
}

①:reindex 默认是通过_doc排序,所以random_score不会有任何效果,除非你覆盖_score排序。

参考地址:
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html

这篇关于elasticsearch之Document APIs【Reindex API】的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1112158

相关文章

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

【LabVIEW学习篇 - 21】:DLL与API的调用

文章目录 DLL与API调用DLLAPIDLL的调用 DLL与API调用 LabVIEW虽然已经足够强大,但不同的语言在不同领域都有着自己的优势,为了强强联合,LabVIEW提供了强大的外部程序接口能力,包括DLL、CIN(C语言接口)、ActiveX、.NET、MATLAB等等。通过DLL可以使用户很方便地调用C、C++、C#、VB等编程语言写的程序以及windows自带的大

如何更优雅地对接第三方API

如何更优雅地对接第三方API 本文所有示例完整代码地址:https://github.com/yu-linfeng/BlogRepositories/tree/master/repositories/third 我们在日常开发过程中,有不少场景会对接第三方的API,例如第三方账号登录,第三方服务等等。第三方服务会提供API或者SDK,我依稀记得早些年Maven还没那么广泛使用,通常要对接第三方

Java基础回顾系列-第五天-高级编程之API类库

Java基础回顾系列-第五天-高级编程之API类库 Java基础类库StringBufferStringBuilderStringCharSequence接口AutoCloseable接口RuntimeSystemCleaner对象克隆 数字操作类Math数学计算类Random随机数生成类BigInteger/BigDecimal大数字操作类 日期操作类DateSimpleDateForma

JavaScript中document.cookie

“某些 Web 站点在您的硬盘上用很小的文本文件存储了一些信息,这些文件就称为 Cookie。”—— MSIE 帮助。一般来说,Cookies 是 CGI 或类似,比 HTML 高级的文件、程序等创建的,但是 javascript 也提供了对 Cookies 的很全面的访问权利。       每个 Cookie 都是这样的:<cookie名>=<值>   <cookie名>的限制与 javasc

Restful API 原理以及实现

先说说API 再说啥是RESRFUL API之前,咱先说说啥是API吧。API大家应该都知道吧,简称接口嘛。随着现在移动互联网的火爆,手机软件,也就是APP几乎快爆棚了。几乎任何一个网站或者应用都会出一款iOS或者Android APP,相比网页版的体验,APP确实各方面性能要好很多。 那么现在问题来了。比如QQ空间网站,如果我想获取一个用户发的说说列表。 QQ空间网站里面需要这个功能。

京东物流查询|开发者调用API接口实现

快递聚合查询的优势 1、高效整合多种快递信息。2、实时动态更新。3、自动化管理流程。 聚合国内外1500家快递公司的物流信息查询服务,使用API接口查询京东物流的便捷步骤,首先选择专业的数据平台的快递API接口:物流快递查询API接口-单号查询API - 探数数据 以下示例是参考的示例代码: import requestsurl = "http://api.tanshuapi.com/a

WordPress开发中常用的工具或api文档

http://php.net/ http://httpd.apache.org/ https://wordpress.org/ https://cn.wordpress.org/ https://core.svn.wordpress.org/ zh-cn:开发者文档: https://codex.wordpress.org/zh-cn:%E5%BC%80%E5%8F%91%E8%80%

Java后端微服务架构下的API限流策略:Guava RateLimiter

Java后端微服务架构下的API限流策略:Guava RateLimiter 大家好,我是微赚淘客返利系统3.0的小编,是个冬天不穿秋裤,天冷也要风度的程序猿! 在微服务架构中,API限流是保护服务不受过度使用和拒绝服务攻击的重要手段。Guava RateLimiter是Google开源的Java库中的一个组件,提供了简单易用的限流功能。 API限流概述 API限流通过控制请求的速率来防止

ElasticSearch的DSL查询⑤(ES数据聚合、DSL语法数据聚合、RestClient数据聚合)

目录 一、数据聚合 1.1 DSL实现聚合 1.1.1 Bucket聚合  1.1.2 带条件聚合 1.1.3 Metric聚合 1.1.4 总结 2.1 RestClient实现聚合 2.1.1 Bucket聚合 2.1.2 带条件聚合 2.2.3 Metric聚合 一、数据聚合 聚合(aggregations)可以让我们极其方便的实现对数据的统计、分析、运算。例如: