- 作者:老汪软件技巧
- 发表时间:2024-10-17 04:03
- 浏览量:
es数据预处理是指在从数据源写入es的中间环节对数据进行的处理,可以看作大数据的ETL(抽取、转换、加载)环节。
1. 预处理定义
一般情况下,在程序中写入数据或者从第三方数据源(MySQL、Spark等)导入数据,都是直接批量同步es的,原始数据长什么样,写入索引化的数据就是什么样。
数据预处理的步骤大致拆解如下。
数据清洗:主要是为了去除重复数据、去除干扰数据以及填充默认值。数据集成:将多个数据源的数据放在一个统一的数据存储平台中。数据转换:将数据转化成适合进行数据挖掘或分析的形式。
第二章提到的ingest节点的本质是在实际文档建立索引之前使用ingest节点对文档进行预处理。ingest节点的主要职责是拦截并处理来自批量索引或单个索引的请求。它先用预设的转换规则对请求数据进行处理,再将处理后的文档通过相应的单个索引或批量索引的API进行写入,从而将数据存储到相应的位置。所做的这些工作就是数据预处理。
实际业务场景中,预处理步骤如下。
定义预处理管道,通过管道实现数据预处理。根据实际要处理的复杂数据的特点,有针对性地设置一个或者多个管道。写入数据关联预处理管道。写入数据、更新数据或者reindex操作环节,指定要处理索引的管道,实际就是将写入索引与上面的预处理管道1和预处理管道2关联起来。写入数据。
一句话概括:ingest节点的预处理工作是实际文档索引化之前对文档进行的预先处理。
2. 预处理器的分类
3. 预处理实现
如果没有预处理,而是将数据原样导入es,并在分析阶段做检索脚本处理,那么一方面会发现脚本处理能力有限,另一方面会徒增性能问题。所以,非必要不推荐直接导入数据。
而提前借助ingest节点进行数据预处理,做好必要的数据清洗(ETL),哪怕增大空间存储(如新增字段),也要以空间换时间,为后续分析环节扫清障碍。这样看似写入变得复杂,实则为分析赢取了时间。业务层面涉及数据处理的场景都推荐使用ingest预处理实现。
预处理管道的定义如下。
PUT _ingest/pipeline/my_pipeline_id
{
"version": 1,
"processors": [
{}
]
}
4. 预处理实战4.1 字符串切分
根据_id字符串切分再聚合统计
PUT lwyindex
{
"mappings": {
"properties": {
"mid":{
"type": "keyword"
}
}
}
}
POST lwyindex/_bulk
{"index":{"_id":1}}
{"mid":"C12345"}
{"index":{"_id":2}}
{"mid":"C12456"}
{"index":{"_id":3}}
{"mid":"C31268"}
# 预处理,提取前两个字符
PUT _ingest/pipeline/split_mid
{
"processors": [
{
"script": {
"lang": "painless",
"source": "ctx.mid_prefix = ctx.mid.substring(0,2)"
}
}
]
}
# 通过预处理进行更新
POST lwyindex/_update_by_query?pipeline=split_mid
{
"query": {
"match_all": {}
}
}
4.2 字符串转为JSON格式
{
"headers":{
"userInfo":[
"{ \"password\": \"test\",\n \"username\":\"lwy\"}"
]
}
}
插入数据时想把JSON类型转换为Object类型
POST lwy_index/_doc/1
{
"headers":{
"userInfo":[
"{ \"password\": \"test\",\n \"username\":\"lwy\"}"
]
}
}
GET lwy_index/_mapping
PUT _ingest/pipeline/json_builder
{
"processors": [
{
"json":{
"field": "headers.userInfo",
"target_field": "headers.userjson"
}
}
]
}
POST lwy_index/_update_by_query?pipeline=json_builder
再次查看mapping会发现多了一个userjson字段,将userInfo字符串json化了。
4.4 enrich预处理
将实战项目抽象为如下两个需求。
需求1:将如下所示的topicA和topicB的数据写到同一个索引中,但更新速度太慢,如何加速写入呢?(topicA和topicB的数据可能会有几天的延时。)
kafka源数据:
topicA:{"A_content":"XXX","name":"A","type":"XXX","id":1}
topicB:{"B_content":"XXX","name":"B","type":"XXX","id":1}
需求2:在cluster1上有a、b两索引,a、b均有字段filed_a,又各自包含其他字段,现需要建立新索引c,要求c包含a的全部文档,且对于a和b的关联字段field_a的值相同的文档,需要把b的其他字段更新到c中。
1.需求分析
如上两个需求都涉及两个索引数据之间的关联。
但以上四种方式都无法实现上述需求。
而对于需求2,需要构建一个全新的索引,这个索引需要包含另一索引的所有文档数据。更进一步,我们希望能在不同索引间通过相同的关联字段,从一个索引向另一个索引扩充字段信息。这是一个涉及跨索引字段扩充的任务。在es6.5版本的ingest预处理环节新增了enrich processor字段来丰富功能,能很好地实现该需求。
2.认识enrich processor
从全局视角来看,enrich processor是ingest预处理管道中众多预处理器中的一个。借助enrich预处理管道,可以将已有索引中的数据添加到新写入的文档中。官方举例如下。
3.非enrich processor工作原理
4.enrich processor工作原理
enrich policy。policy实际是阶段(phrase)和动作(action)的综合体。
enrich数据预处理环节,其组成成分有哪些?(1) enrich policy
PUT lwy_index
{
"mappings": {
"properties": {
"name":{
"type": "text"
}
}
}
}
PUT /_enrich/policy/data-policy
{
"match": {
"indices": "lwy_index",
"match_field": "name",
"enrich_fields": [
"author",
"publisher"
]
}
}
(2) source index
用于丰富新写入文档的索引。它是目标索引中添加的待富化数据的源头数据索引。
(3) enrich index
执行enrich policy生成的索引
POST /_enrich/policy/data-policy/_execute
enrich index特点:
通过Get方式获取索引信息
Get .enrich-data-policy*
直接用source索引不是更直接吗?使用source索引直接将传入文档与源索引中的文档进行匹配,可能会很慢且需要大量资源。为了加快速度,enrich索引应运而生。使用source源索引过程中可能会有大量的增删改查操作,而enrich一经创建便不允许更改,除非进行重新执行policy。
综上可知,enrich processor适用于日志场景,以及其他需要对跨索引的丰富数据进行预处理的场景。
enrich processor需执行多项操作,可能会影响ingest管道的速度。对此,官方强烈建议在将enrich预处理器部署到生产环境之前对其进行测试和基准测试。同时,官方不建议使用enrich处理器来丰富实时数据。enrich processor最适合的数据类型为不经常更改的索引数据。
5.enrich processor实战
针对开始时提出的需求1、需求2,使用传统的索引关联方式并不能解决问题,尝试使用enrich processor来实现
(1)创建初始索引
PUT lwy_index1
{
"mappings": {
"properties": {
"field_a":{
"type": "keyword"
},
"title":{
"type": "keyword"
},
"publish_time":{
"type": "date"
}
}
}
}
POST lwy_index1/_bulk
{"index":{"_id":1}}
{"field_a":"aaa","title":"es in action", "publish_time":"2017-07-01T00:00:00"}
# 创建索引
PUT lwy_index2
{
"mappings": {
"properties": {
"field_a":{
"type": "keyword"
},
"author":{
"type": "keyword"
},
"publisher":{
"type": "keyword"
}
}
}
}
# 和lwy_index1存在相同字段field_a
POST lwy_index2/_bulk
{"index":{"_id":1}}
{"field_a":"aaa","author":"jerry", "publisher":"CD"}
(2) 创建data-policy并执行
PUT /_enrich/policy/data-policy
{
"match": {
"indices": [
"lwy_index2"
],
"match_field": "field_a",
"enrich_fields": [
"author",
"publisher"
]
}
}
POST /_enrich/policy/data-policy/_execute
(3) 创建预处理管道
PUT /_ingest/pipeline/data_lookup
{
"processors": [
{
"enrich": {
"policy_name": "data-policy",
"field": "field_a",
"target_field": "field_from_bindex",
"max_matches": 1
}
},
{
"append": {
"field": "author",
"value": "{{field_from_bindex.author}}"
}
},
{
"append": {
"field": "publisher",
"value": "{{field_from_bindex.publisher}}"
}
},
{
"remove": {
"field": "field_from_bindex"
}
}
]
}
(4)建立reindex索引
POST _reindex
{
"source": {
"index": [
"lwy_index1"
]
},
"dest":{
"index": "lwy_index_max",
"pipeline": "data_lookup"
}
}
POST lwy_index_max/_search
可见,索引lwy_index_max实现了索引lwy_index1和索引lwy_index2的融合
新写入的文档通过enrich processor达到了跨索引丰富数据的目的,最终写入目标索引。而丰富数据是借助enrich policy将源索引生成系统只读索引(enrich index)实现的。enrich processor预处理可以算作跨索引处理数据的扩展。
4.5 预处理实战常见问题
1.ingest角色的必要性
所有节点都默认启用ingest角色,因此任何节点都可以完成数据的预处理任务。但当集群数据量级够大且预处理任务繁重时,建议拆分节点角色,设置独立专用的ingest节点。
2.何时指定预处理管道
PUT lwy_index
{
"settings": {
"index.default_pipeline": "split_mid"
}
}
PUT _index_template/lwy_template
{
"index_patterns":[
"lwyindex*"
],
"template": {
"settings":{
"index.default_pipeline": "split_mid"
}
}
}
POST _reindex
{
"source": {
"index": "lwy_index1"
},
"dest": {
"index": "lwy_index2",
"pipeline": "split_mid"
}
}
POST lwy_index/_update_by_query?pipeline=json_builder