本文主要是介绍elasticsearch之Document APIs【Reindex API】,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
环境
elasticsearch:5.5
Reindex API
重要
Reindex
不会尝试设置目标索引(即数据库)。它不会复制源索引的设置。你应该在运行_reindex
操作之前设置目标索引,包括设置mappings
,分片的数量
,副本等待。
_reindex
的最基本形式是把一个索引中的文档复制到另一个索引中。下面的例子是将twitter
索引(即:数据库)中的文档复制到new_twitter
索引中:
POST _reindex
{"source": {"index": "twitter"},"dest": {"index": "new_twitter"}
}
返回的结果如下:
{"took" : 147,"timed_out": false,"created": 120,"updated": 0,"deleted": 0,"batches": 1,"version_conflicts": 0,"noops": 0,"retries": {"bulk": 0,"search": 0},"throttled_millis": 0,"requests_per_second": -1.0,"throttled_until_millis": 0,"total": 120,"failures" : [ ]
}
就像_update_by_query
,_reindex
会从源索引中获取快照,然而目标索引必须是不同的索引,所以版本冲突是不可能的。dest
元素可以像index api
一样配置乐观并发锁控制。只省略version_type
或者设置其为internal
将会造成elasticsearch
盲目的将文档转储到目标索引中,还会覆盖相同的type
和id
:
POST _reindex
{"source": {"index": "twitter"},"dest": {"index": "new_twitter","version_type": "internal"}
}
设置version_type
为external
将会造成elasticsearch
会保留源的版本号,创建任何缺失的文档,并且在目标索引中更新旧版本的文档,而不是在源索引中做这些:
POST _reindex
{"source": {"index": "twitter"},"dest": {"index": "new_twitter","version_type": "external"}
}
设置op_type
为create
将会造成_reindex
操作只会去创建在目标索引中缺失的文档。所有已存在的文档将会引起冲突:
POST _reindex
{"source": {"index": "twitter"},"dest": {"index": "new_twitter","op_type": "create"}
}
默认情况下,版本冲突会中止_reindex
进程,但是你可以在请求体中设置"conflicts": "proceed"
来进行计数(即有多少个版本冲突)。
POST _reindex
{"conflicts": "proceed","source": {"index": "twitter"},"dest": {"index": "new_twitter","op_type": "create"}
}
你可以通过给source
添加type
或者添加查询条件来对文档进行限制。
这将只复制tweet's
中的kimchy
到new_twitter
中:
POST _reindex
{"source": {"index": "twitter","type": "tweet","query": {"term": {"user": "kimchy"}}},"dest": {"index": "new_twitter"}
}
index
和 type
在source
中都可以是列表,其允许你在一个请求中从多个source
中进行复制。下面是从在twitter
、blog
索引中的tweet
、post
类型中复制文档。其也包含在twitter
索引中的post
类型和在blog
索引中的tweet
类型。如果你想更具体些,你需要使用query
。其不会处理id
冲突。目标索引就会有效的保留,但是不能预测哪个文档应该保留下来,因为顺序迭代没有定义好顺序。
POST _reindex
{"source": {"index": ["twitter", "blog"],"type": ["tweet", "post"]},"dest": {"index": "all_together"}
}
也可以通过设置size
来限制处理文档的数量。下面仅仅从twitter
中复制一个文档到new_twitter
中:
POST _reindex
{"size": 1,"source": {"index": "twitter"},"dest": {"index": "new_twitter"}
}
如果你想从twitter
索引中获取一系列特定的文档,你需要进行排序。
排序会使得scroll
降低效率,但是在某些情况下,这是值得的。
如果可以的话,更应该偏向于选择通过size
和sort
进行查询。
下面例子是从twitter
中复制10000
文档到new_twitter
中:
POST _reindex
{"size": 10000,"source": {"index": "twitter","sort": { "date": "desc" }},"dest": {"index": "new_twitter"}
}
source 部分支持在search request
中支持的所有元素。例如:只想从原始文档中reindex
一部分字段,可以使用source
进行过滤:
POST _reindex
{"source": {"index": "twitter","_source": ["user", "tweet"]},"dest": {"index": "new_twitter"}
}
和_update_by_query
一样、_reindex
支持脚本来修改文档。不同的是,_update_by_query
,允许脚本修改文档的元数据。下面例子修改了源文档的版本:
POST _reindex
{"source": {"index": "twitter"},"dest": {"index": "new_twitter","version_type": "external"},"script": {"inline": "if (ctx._source.foo == 'bar') {ctx._version++; ctx._source.remove('foo')}","lang": "painless"}
}
就像_update_by_query
,你可以设置ctx.op
来改变在目标索引中执行的操作:
参数 | 描述 |
---|---|
noop | 如果你的脚本决定不必索引 目标索引中的文档,那么可以设置ctx.op="noop" 。在response body 中的noop 计数器中没有操作将会被报告 |
delete | 如果你的脚本判定必须从目标索引中删除文档,那么可以设置ctx.op = "delete" 。删除的(文档)在response body 的deleted 计数器中将会被报告 |
将ctx.op
设置为任何其他参数都是错误的。在ctx
设置任何其他字段都是错误。
设置_version
为null
或者从ctx map
中清除, 就像在indexing
请求中
发送没有version
的请求。其会造成:无论目标版本或者在_reindex
请求中你使用的version type
,都会覆盖目标索引(即数据库)中的文档。
默认情况下,如果_reindex
发现一个有路由的文档,接着该路由会被保留,除非通过脚本改变它。你可以在dest
请求上设置routing
来改变:
参数 | 描述 |
---|---|
keep | 为bulk 请求(每个匹配到的子项)设置匹配到的路由。默认 |
discard | 将发送的bulk 请求(每个匹配到的子项)的路由设置为null 。 |
=<some text> | 将发送的bulk 请求(每个匹配到的子项)的路由设置为= 之后的所有文本。 |
例如:你可以使用下面这个请求从source
索引(即数据库)中的company
名称为cat
的所有文档都复制到路由为cat
的dest
索引(即数据库)中。
POST _reindex
{"source": {"index": "source","query": {"match": {"company": "cat"}}},"dest": {"index": "dest","routing": "=cat"}
}
_reindex
默认使用scroll
的批次是1000
。你在source
元素中可以使用size
字段来修改批次的大小:
POST _reindex
{"source": {"index": "source","size": 100},"dest": {"index": "dest","routing": "=cat"}
}
通过指定pipeline
,Reindex
也可以使用Ingest Node
特性,如下:
POST _reindex
{"source": {"index": "source"},"dest": {"index": "dest","pipeline": "some_ingest_pipeline"}
}
Reindex from Remote
Reindex 支持从一个远程的elasticsearch
集群来重新索引:
POST _reindex
{"source": {"remote": {"host": "http://otherhost:9200","username": "user","password": "pass"},"index": "source","query": {"match": {"test": "data"}}},"dest": {"index": "dest"}
}
host
参数必须包含的格式:host
和port
(例如:https://otherhost:9200)。username
和passwork
参数是可选的,当它们存在时,reindex
将把它们作为连接远程elasticsearch
节点基本认证信息。当使用基础认证或者发送密码存文本请确保使用https
。
在elasticsearch.yaml
中使用reindex.remote.whitelist
属性来明确远程主机的白名单。其可以设置一个由逗号分隔的远程主机列表(比如:otherhost:9200, another:9200, 127.0.10.*:9200, localhost:*
)。
通过白名单格式(Scheme
)可以被忽略 —- 只使用host
和port
。
这个特性应该适用于你可能找到的任何版本的elasticsearch
远程集群。
这也就允许你通过reindexing
从旧版本的集群中将任何版本的elasticsearch
升级到最新版本。
要启用发送给旧版本的elasticsearch
的查询,query
参数将直接发送到远程主机,无需验证和修改。
注意
从远程主机
reindexing
是不支持manual
或automatic slicing
。
从远程服务器上reindexing
使用了堆栈缓存区
,默认最大值是100mb
。
如果远程索引里的文档非常巨大,那么你需要使用小一点的批次大小。下例把批次大小为10,是非常非常小啦:
POST _reindex
{"source": {"remote": {"host": "http://otherhost:9200"},"index": "source","size": 10,"query": {"match": {"test": "data"}}},"dest": {"index": "dest"}
}
也可以用socket_timeout
字段来设置在远程连接中socket
读取超时时间,和使用connect_timeout
字段来设置连接超时时间。这两个字段默认是30秒
。
下面这个例子是设置socket read
超时时间为1分钟
、连接超时时间为10秒
:
POST _reindex
{"source": {"remote": {"host": "http://otherhost:9200","socket_timeout": "1m","connect_timeout": "10s"},"index": "source","query": {"match": {"test": "data"}}},"dest": {"index": "dest"}
}
URL Parameters
除了标准参数像pretty
,Reindex API
也支持refresh
,wait_for_completion
,wait_for_active_shards
,timeout
和requests_per_second
。
发送带refresh URL
参数请求将会造成所有请求写入的索引(即:数据库)将会被刷新。这不同于Index API
的refresh
参数,其只是造成收到新数据的分片进行刷新。
如果请求里包含wait_for_completion=false
,接着elasticsearch
将会进行预检,启动请求,和返回一个可以被用于Tasks APIs
的task
,其可以被取消或者获取task
的状态。elasticsearch
也将会创建一个记录这个task
的文档,该文档的路径是.tasks/task/${taskId}
。你可以根据情况去考虑是否删除。当你删除后,elasticsearch
会回收其空间。
wait_for_active_shards
用于控制在处理reindexing
之前,副本分片必须有多少个存活。详情可以看这里
。
timeout
用于控制每个写请求在等待不可用分片变为可用需要等待多久时间。Both work exactly how they work in the Bulk API.
requests_per_second
可以设置任何正的十进制数字(1.4, 6, 1000, 等等)和 节流率:reindex
通过每批次等待时间来分配批次的索引操作。可以通过设置requests_per_second
为-1
来关闭这个限制。
通过批次之间的等待来实现这种限制,以便reindex
使用内部的scroll
可以传入一个差额的超时时间。这个差额时间是批处理大小除以requests_per_second
再减去写入时间。批处理大小默认是1000
,如果requests_per_second
设置为500
:
target_time = 1000 / 500 per second = 2 seconds
wait_time = target_time - write_time = 2 seconds - .5 seconds = 1.5 seconds
因为批处理是发送一个_bulk
请求,所以大批量批处理将会造成elasticsearch
创建许多请求和然后在下个集合开始之前等待一会。这是bursty
而不是smooth
。默认是-1
。
Response body
其json
的响应如下:
{"took" : 639,"updated": 0,"created": 123,"batches": 1,"version_conflicts": 2,"retries": {"bulk": 0,"search": 0}"throttled_millis": 0,"failures" : [ ]
}
字段 | 描述 |
---|---|
took | 从操作开始到结束总共花费的毫秒数 |
updated | 文档更新成功的数量 |
created | 成功创建文档的数量 |
batches | 通过reindex 拉取到scroll 响应的数量 |
version_conflicts | reindex 发送版本冲突的数量 |
retries | reindex 尝试重试的次数。bulk 是bulk 行为重试的次数。search 是search 行为重试的次数 |
throttled_millis | 请求符合requests_per_second 睡眠的毫秒数 |
failures | 数组,包含所有失败的索引。如果不为空,请求会因为失败而中止。可以参考如何预防版本冲突而中止操作 |
Works with the Task API
你可以使用Task API
来提取全部正在运行的reindex
请求的状态。
GET _tasks?detailed=true&actions=*reindex
响应如下:
{"nodes" : {"r1A2WoRbTwKZ516z6NEs5A" : {"name" : "r1A2WoR","transport_address" : "127.0.0.1:9300","host" : "127.0.0.1","ip" : "127.0.0.1:9300","attributes" : {"testattr" : "test","portsfile" : "true"},"tasks" : {"r1A2WoRbTwKZ516z6NEs5A:36619" : {"node" : "r1A2WoRbTwKZ516z6NEs5A","id" : 36619,"type" : "transport","action" : "indices:data/write/reindex","status" : { ①"total" : 6154,"updated" : 3500,"created" : 0,"deleted" : 0,"batches" : 4,"version_conflicts" : 0,"noops" : 0,"retries": {"bulk": 0,"search": 0},"throttled_millis": 0},"description" : ""}}}}
}
这个对象包含实际的状态。其就是
json
格式,并且重要的是其包含total
字段。total
:期望执行reindex
操作的总数。你可以通过添加updated
、created
和deleted
字段来预估进度。当它们的总和等于total
字段时,请求将会结束。
你可以直接使用task id
来查看任务信息:
GET /_tasks/taskId:1
上面这个api
的优点是其与wait_for_completion=false
整合来明确的返回已完成task
的状态。如果task
已完成并且设置了wait_for_completion=false
,将返回results
或者error
字段。wait_for_completion=false
这个特性的代价是在.tasks/task/${taskId}
中创建一个文档。你可以更新需要删除它。
Works with the Cancel Task API
任何一个reindex
都可以使用Task Cancel API
进行取消:
POST _tasks/task_id:1/_cancel
可以使用上面的api
来发现task_id
。
取消应该是快速发生但是也有可能花费几分钟。上面任务状态api
将会继续列出任务直到其自身被取消。
Rethrottling
使用_rethrottle api
可以改变正在运行的reindex
的requests_per_second
值:
POST _reindex/task_id:1/_rethrottle?requests_per_second=-1
使用上面的api
可以找到task_id
.
就像在_reindex API
设置一样,requests_per_second
可以设置为-1
来禁用throttling
,或者设置任意十进制数像1.7
或者12
来设置throttle
的级别。Rethrottling
加速查询会立即生效,但是rethrotting
在完成当前批次之后缓慢查询将会生效。这是为了防止scroll
超时。
Reindex to change the name of a field
_reindex
可以在复制索引时重命名字段。假设你创建如下一个文档:
POST test/test/1?refresh
{"text": "words words","flag": "foo"
}
假设现在你不喜欢flag
这个名字,想把它换成tag
。可以使用_reindex
来实现:
POST _reindex
{"source": {"index": "test"},"dest": {"index": "test2"},"script": {"inline": "ctx._source.tag = ctx._source.remove(\"flag\")"}
}
现在你得到新文档:
GET test2/test/1
如下:
{"found": true,"_id": "1","_index": "test2","_type": "test","_version": 1,"_source": {"text": "words words","tag": "foo"}
}
或者你可以通过tag
或是任何你想的进行搜索。
Manual slicing
reindex支持Sliced Scroll
,允许你相对容易的手动并行处理:
POST _reindex
{"source": {"index": "twitter","slice": {"id": 0,"max": 2}},"dest": {"index": "new_twitter"}
}
POST _reindex
{"source": {"index": "twitter","slice": {"id": 1,"max": 2}},"dest": {"index": "new_twitter"}
}
你可以校验效果:
GET _refresh
POST new_twitter/_search?size=0&filter_path=hits.total
其结果中会有一个total
如下:
{"hits": {"total": 120}
}
Automatic slicing
你也可以使用Sliced Scroll
让reindex
自动并行化:
POST _reindex?slices=5&refresh
{"source": {"index": "twitter"},"dest": {"index": "new_twitter"}
}
你可以校验效果:
POST new_twitter/_search?size=0&filter_path=hits.total
其结果中会有一个total
:
{"hits": {"total": 120}
}
在上面部分中添加slices
给reindex
只是将手动处理自动化,创建子请求意味着其会有一些特殊:
- 你可以在
Tasks APIs
中看到这些请求,这些子任务是带有slices
请求的孩子
任务。 - 带
slices
请求获取任务状态时,只会获取已完成slices
的状态。 - 这些子请求像
取消
和rethrottling
是独立寻址的; - 带
slices
的Rethrottling
请求将会限制未完成子请求的比例。 - 带
slices
的Canceling
请求将会取消每个子请求。 - 由于
slices
的特性,每个子查询不会得到完全均匀的文档。所有的文档都将会被处理,但是某些slices
可能会比其他的大些。期望大切片分配的更均匀。 - 在
slices
请求上使用像requests_per_second
和size
参数按比例分配到每个子请求上。结合上面slices
分配不均匀的问题,你应该可以得出结论使用带slices
的_reindex
中使用size
,可能不会得到正确的文档大小。 - 每个子请求会从源索引那里得到略有不同的快照,尽管看上去他们获取的时间大致相同。
Picking the number of slices
这里我们有些关于使用slices
的一些建议(如果手动并行化的话,那么就是在slice API
max的参数):
- 不要使用大数字。比如500,会使
CPU
发生相当多的thrash
(大部分时间都用在翻页上)。 - 从查询性能的角度来看,在源索引中使用分片的倍数是更高效的。
- 从查询性能的角度来看,在源索引中使用和分片一样的数量是最高效的。
- 索引的性能应该在可利用资源之间以切片数量线性扩展。
- 索引(插入)或者查询性能是否主导进程取决于很多因素,像文档重新索引和集群正在重新索引。
Reindex daily indices
你可以使用_reindex
和Painless
组合来reindex daily indices
,以把新模板应用到已有的文档上。
假设你有以下文件组成的索引:
PUT metricbeat-2016.05.30/beat/1?refresh
{"system.cpu.idle.pct": 0.908}
PUT metricbeat-2016.05.31/beat/1?refresh
{"system.cpu.idle.pct": 0.105}
metricbeat-*
索引新模板早已加载到了elasticsearch
,但是其仅仅适用于新创建的索引。Painless
可用于重新索引已存在的文档和应用新文档。
下面的脚本从索引名称中 提取日期,并创建一个带-1
的新索引。来自metricbeat-2016.05.31
的所有数据将重新索引到metricbeat-2016.05.31-1
。
POST _reindex
{"source": {"index": "metricbeat-*"},"dest": {"index": "metricbeat"},"script": {"lang": "painless","inline": "ctx._index = 'metricbeat-' + (ctx._index.substring('metricbeat-'.length(), ctx._index.length())) + '-1'"}
}
之前metricbeat
索引中的所有文档现在可以在*-1
索引中查询。
GET metricbeat-2016.05.30-1/beat/1
GET metricbeat-2016.05.31-1/beat/1
以前的方法也可以联合change the name of a field
使用来只加载已存在的数据到新索引中,如果需要也可以重命名字段。
Extracting a random subset of an index
reindex
也可以随机提取索引中的一个子集:
POST _reindex
{"size": 10,"source": {"index": "twitter","query": {"function_score" : {"query" : { "match_all": {} },"random_score" : {}}},"sort": "_score" ①},"dest": {"index": "random_twitter"}
}
①:reindex 默认是通过_doc
排序,所以random_score
不会有任何效果,除非你覆盖_score
排序。
参考地址:
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html
这篇关于elasticsearch之Document APIs【Reindex API】的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!