• 作者:老汪软件技巧
  • 发表时间:2024-10-17 04:03
  • 浏览量:

es数据预处理是指在从数据源写入es的中间环节对数据进行的处理,可以看作大数据的ETL(抽取、转换、加载)环节。

1. 预处理定义

一般情况下,在程序中写入数据或者从第三方数据源(MySQL、Spark等)导入数据,都是直接批量同步es的,原始数据长什么样,写入索引化的数据就是什么样。

数据预处理的步骤大致拆解如下。

数据清洗:主要是为了去除重复数据、去除干扰数据以及填充默认值。数据集成:将多个数据源的数据放在一个统一的数据存储平台中。数据转换:将数据转化成适合进行数据挖掘或分析的形式。

第二章提到的ingest节点的本质是在实际文档建立索引之前使用ingest节点对文档进行预处理。ingest节点的主要职责是拦截并处理来自批量索引或单个索引的请求。它先用预设的转换规则对请求数据进行处理,再将处理后的文档通过相应的单个索引或批量索引的API进行写入,从而将数据存储到相应的位置。所做的这些工作就是数据预处理。

实际业务场景中,预处理步骤如下。

定义预处理管道,通过管道实现数据预处理。根据实际要处理的复杂数据的特点,有针对性地设置一个或者多个管道。写入数据关联预处理管道。写入数据、更新数据或者reindex操作环节,指定要处理索引的管道,实际就是将写入索引与上面的预处理管道1和预处理管道2关联起来。写入数据。

一句话概括:ingest节点的预处理工作是实际文档索引化之前对文档进行的预先处理。

2. 预处理器的分类

3. 预处理实现

如果没有预处理,而是将数据原样导入es,并在分析阶段做检索脚本处理,那么一方面会发现脚本处理能力有限,另一方面会徒增性能问题。所以,非必要不推荐直接导入数据。

而提前借助ingest节点进行数据预处理,做好必要的数据清洗(ETL),哪怕增大空间存储(如新增字段),也要以空间换时间,为后续分析环节扫清障碍。这样看似写入变得复杂,实则为分析赢取了时间。业务层面涉及数据处理的场景都推荐使用ingest预处理实现。

预处理管道的定义如下。

PUT _ingest/pipeline/my_pipeline_id 
{
  "version": 1,
  "processors": [
    {}
  ]
}

4. 预处理实战4.1 字符串切分

根据_id字符串切分再聚合统计

PUT lwyindex
{
  "mappings": {
    "properties": {
      "mid":{
        "type": "keyword"
      }
    }
  }
}
POST lwyindex/_bulk
{"index":{"_id":1}}
{"mid":"C12345"}
{"index":{"_id":2}}
{"mid":"C12456"}
{"index":{"_id":3}}
{"mid":"C31268"}
# 预处理,提取前两个字符
PUT _ingest/pipeline/split_mid
{
  "processors": [
    {
      "script": {
        "lang": "painless",
        "source": "ctx.mid_prefix = ctx.mid.substring(0,2)"
      }
    }
  ]
}
# 通过预处理进行更新
POST lwyindex/_update_by_query?pipeline=split_mid
{
  "query": {
    "match_all": {}
  }
}

4.2 字符串转为JSON格式

{
  "headers":{
    "userInfo":[
      "{ \"password\": \"test\",\n \"username\":\"lwy\"}"
      ]
  }
}

插入数据时想把JSON类型转换为Object类型

POST lwy_index/_doc/1
{
  "headers":{
    "userInfo":[
      "{ \"password\": \"test\",\n \"username\":\"lwy\"}"
      ]
  }
}
GET lwy_index/_mapping
PUT _ingest/pipeline/json_builder
{
  "processors": [
    {
      "json":{
        "field": "headers.userInfo",
        "target_field": "headers.userjson"
      }
    }
  ]
}
POST lwy_index/_update_by_query?pipeline=json_builder

再次查看mapping会发现多了一个userjson字段,将userInfo字符串json化了。

4.4 enrich预处理

将实战项目抽象为如下两个需求。

需求1:将如下所示的topicA和topicB的数据写到同一个索引中,但更新速度太慢,如何加速写入呢?(topicA和topicB的数据可能会有几天的延时。)

kafka源数据:
topicA:{"A_content":"XXX","name":"A","type":"XXX","id":1}
topicB:{"B_content":"XXX","name":"B","type":"XXX","id":1}

需求2:在cluster1上有a、b两索引,a、b均有字段filed_a,又各自包含其他字段,现需要建立新索引c,要求c包含a的全部文档,且对于a和b的关联字段field_a的值相同的文档,需要把b的其他字段更新到c中。

1.需求分析

如上两个需求都涉及两个索引数据之间的关联。

但以上四种方式都无法实现上述需求。

而对于需求2,需要构建一个全新的索引,这个索引需要包含另一索引的所有文档数据。更进一步,我们希望能在不同索引间通过相同的关联字段,从一个索引向另一个索引扩充字段信息。这是一个涉及跨索引字段扩充的任务。在es6.5版本的ingest预处理环节新增了enrich processor字段来丰富功能,能很好地实现该需求。

2.认识enrich processor

从全局视角来看,enrich processor是ingest预处理管道中众多预处理器中的一个。借助enrich预处理管道,可以将已有索引中的数据添加到新写入的文档中。官方举例如下。

3.非enrich processor工作原理

4.enrich processor工作原理

enrich policy。policy实际是阶段(phrase)和动作(action)的综合体。

enrich数据预处理环节,其组成成分有哪些?(1) enrich policy

PUT lwy_index
{
  "mappings": {
    "properties": {
      "name":{
        "type": "text"
      }
    }
  }
}
PUT /_enrich/policy/data-policy
{
  "match": {
    "indices": "lwy_index",
    "match_field": "name",
    "enrich_fields": [
      "author",
      "publisher"
      ]
  }
}

(2) source index

用于丰富新写入文档的索引。它是目标索引中添加的待富化数据的源头数据索引。

(3) enrich index

执行enrich policy生成的索引

POST /_enrich/policy/data-policy/_execute

enrich index特点:

通过Get方式获取索引信息

Get .enrich-data-policy*

直接用source索引不是更直接吗?使用source索引直接将传入文档与源索引中的文档进行匹配,可能会很慢且需要大量资源。为了加快速度,enrich索引应运而生。使用source源索引过程中可能会有大量的增删改查操作,而enrich一经创建便不允许更改,除非进行重新执行policy。

综上可知,enrich processor适用于日志场景,以及其他需要对跨索引的丰富数据进行预处理的场景。

enrich processor需执行多项操作,可能会影响ingest管道的速度。对此,官方强烈建议在将enrich预处理器部署到生产环境之前对其进行测试和基准测试。同时,官方不建议使用enrich处理器来丰富实时数据。enrich processor最适合的数据类型为不经常更改的索引数据。

5.enrich processor实战

针对开始时提出的需求1、需求2,使用传统的索引关联方式并不能解决问题,尝试使用enrich processor来实现

(1)创建初始索引

PUT lwy_index1
{
  "mappings": {
    "properties": {
      "field_a":{
        "type": "keyword"
      },
      "title":{
        "type": "keyword"
      },
      "publish_time":{
        "type": "date"
      }
    }
  }
}
POST lwy_index1/_bulk
{"index":{"_id":1}}
{"field_a":"aaa","title":"es in action", "publish_time":"2017-07-01T00:00:00"}
# 创建索引
PUT lwy_index2
{
  "mappings": {
    "properties": {
      "field_a":{
        "type": "keyword"
      },
      "author":{
        "type": "keyword"
      },
      "publisher":{
        "type": "keyword"
      }
    }
  }
}
# 和lwy_index1存在相同字段field_a
POST lwy_index2/_bulk
{"index":{"_id":1}}
{"field_a":"aaa","author":"jerry", "publisher":"CD"}

(2) 创建data-policy并执行

PUT /_enrich/policy/data-policy
{
  "match": {
    "indices": [
      "lwy_index2"
      ],
    "match_field": "field_a",
    "enrich_fields": [
      "author",
      "publisher"
      ]
  }
}
POST /_enrich/policy/data-policy/_execute

(3) 创建预处理管道

PUT /_ingest/pipeline/data_lookup
{
  "processors": [
    {
      "enrich": {
        "policy_name": "data-policy",
        "field": "field_a",
        "target_field": "field_from_bindex",
        "max_matches": 1
      }
    },
    {
      "append": {
        "field": "author",
        "value": "{{field_from_bindex.author}}"
      }
    },
    {
      "append": {
        "field": "publisher",
        "value": "{{field_from_bindex.publisher}}"
      }
    },
    {
      "remove": {
        "field": "field_from_bindex"
      }
    }
  ]
}

(4)建立reindex索引

POST _reindex
{
  "source": {
    "index": [
      "lwy_index1"
      ]
  },
  "dest":{
        "index": "lwy_index_max",
        "pipeline": "data_lookup"
      }
}
POST lwy_index_max/_search

可见,索引lwy_index_max实现了索引lwy_index1和索引lwy_index2的融合

新写入的文档通过enrich processor达到了跨索引丰富数据的目的,最终写入目标索引。而丰富数据是借助enrich policy将源索引生成系统只读索引(enrich index)实现的。enrich processor预处理可以算作跨索引处理数据的扩展。

4.5 预处理实战常见问题

1.ingest角色的必要性

所有节点都默认启用ingest角色,因此任何节点都可以完成数据的预处理任务。但当集群数据量级够大且预处理任务繁重时,建议拆分节点角色,设置独立专用的ingest节点。

2.何时指定预处理管道

PUT lwy_index
{
  "settings": {
    "index.default_pipeline": "split_mid"
  }
}

PUT _index_template/lwy_template
{
  "index_patterns":[
      "lwyindex*"
    ],
    "template": {
      "settings":{
        "index.default_pipeline": "split_mid"
      }
    }
}

POST _reindex
{
  "source": {
    "index": "lwy_index1"
  },
  "dest": {
    "index": "lwy_index2",
    "pipeline": "split_mid"
  }
}

POST lwy_index/_update_by_query?pipeline=json_builder