elasticsearch ingest pipeline — tips and tricks #1

This is a series of tips for Elasticsearch Ingest Pipelines. In the 1st blog; we would go through a few useful techniques including:

  • adding a current timestamp to the document (act as last_update_time)
  • usage of the “pipeline” processor and a simple design pattern for re-using pipeline code
  • exception handling on calling pipelines

If you are interested in some advanced pipeline-processor tips; do read the following blog.

1. adding “” to Documents

Ingest pipeline(s) help data massaging in 2 ways:

  • pre-processing of documents before the final ingestion into Elasticsearch data nodes (a.k.a. data massage process)
  • data repair on existing documents based on enhanced business needs (a.k.a. data patching)

Somehow, no matter if it is the pre-processing stage or the data repair stage, there is often a common objective => adding a “last_update_time” timestamp field to identify when the change has been imposed.

Adding back a current timestamp has a couple of ways as well. Approach 1 is to re-use the ingestion timestamp during the pre-processing stage of documents:

simulation result as follows:

{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_type",
"_id" : "_id",
"_source" : {
"last_update_time" : "2019-05-06T02:35:47.945808Z",
"procurement_id" : "12_yuy190",
"action" : "filling_procurement_form",
"user" : "jaime_10234"
},
"_ingest" : {
"timestamp" : "2019-05-06T02:35:47.945808Z"
}
}
}
]
}

You can see now the output document contains 1 more additional field ‘last_update_time” pointing exactly to the ingestion timestamp value.

Approach 2 is by adding a script processor and create the current timestamp in the (painless) script way.

and the simulate result as follows:

{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_type",
"_id" : "_id",
"_source" : {
"last_update_time" : "2019-05-06T02:49:40.894Z",
"procurement_id" : "12_yuy190",
"action" : "filling_procurement_form",
"user" : "jaime_10234"
},
"_ingest" : {
"timestamp" : "2019-05-06T02:49:40.894798Z"
}
}
}
]
}

Interestingly, you can see now the “last_update_time” is slightly different from the _ingest.timestamp since the script processor is run before the “ingestion” hence the _ingest.timestamp should come slightly later.

2. pipeline design approach

Now we did our testing on the approaches to add back a current timestamp; but then many data massage processes need to add back this timestamp information within their workflow, does it mean you would need to copy and paste the above code / processor every time you create a new pipeline???

Since Elasticsearch 6.5.x, there is a new “pipeline” processor in which we could invoke another pipeline within the current pipeline. Now that is something really cool and all pipelines are like functions / APIs suddenly; you can invoke any of them when necessary, therefore code re-use is possible.

Based on this new feature, you might need to re-factor existing pipeline code to extract common business logics out and form a functional pipeline for other main-stream pipelines to invoke. In our scenario, adding a “last_update_time” is a kind of functional pipeline

Create a new functional pipeline as follows:

Great, we have added “func_add_last_update_time”; now assume we have another pipeline for certain business logic in which would like to add a last_update_time whenever a change is applied to the document(s).

{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_type",
"_id" : "_id",
"_source" : {
"user_id" : "jaime_10234",
"action_id" : "12_yuy190",
"action" : "filling_procurement_form",
"action_ts" : "2019-04-18T13:12:09.000Z",
"description" : "procurement for ABC company on spare parts"
},
"_ingest" : {
"timestamp" : "2019-05-06T03:25:33.951423Z"
}
}
}
]
}

The main-stream pipeline above is quite straight forward, it tries to parse a given “message” into individual fields; there is also a timestamp field conversion plus removing the original “message” after usage. If we would like to add back a “last_update_time”; simply add back a “pipeline” processor at the back of the workflow =>

{
"docs" : [
{
"doc" : {
"_index" : "_index",
"_type" : "_type",
"_id" : "_id",
"_source" : {
"last_update_time" : "2019-05-06T03:26:13.715Z",
"user_id" : "jaime_10234",
"action_id" : "12_yuy190",
"action" : "filling_procurement_form",
"action_ts" : "2019-04-18T13:12:09.000Z",
"description" : "procurement for ABC company on spare parts"
},
"_ingest" : {
"timestamp" : "2019-05-06T03:26:13.71505Z"
}
}
}
]
}

now we got back the “last_update_time” without any trouble! And see how easy it is to re-use code too!

3. exception handling on calling another pipeline

Since we could call another pipeline like function calls now; there is a new issue floating out — exception handling on the invoked pipeline…

Let’s create a new functional pipeline and another main-stream pipeline to illustrate the scenario:

simulation time…

simulation result…

For the 1st document for simulation; it should work but the 2nd document won’t and the reason is simple -> “unable to convert [a46] to integer”; if we would like to catch this exception and do something, simply add the “on_failure” clause =>

simulation result…

   // response for the 2nd document
{
"doc" : {
"_index" : "_index",
"_type" : "_type",
"_id" : "_id",
"_source" : {
"message" : "josh blake:a46",
"error" : """convert - For input string: \"a46\""""
},
"_ingest" : {
"timestamp" : "2019-05-06T03:57:13.584682Z"
}
}
}

Now for problematic document(s); it would contain the “message” field untouched and a new “error” field stating the exception. Later on, if you want to query out only the documents with exception, an “exists” query would do the trick.

// replace {{target_index}} with the resulting index
GET {{target_index}}/_search
{
"query": {
"exists": {
"field": "error"
}
}
}

Bonus: apply the pipelines to existing documents (data repair stage)

To apply the created pipelines to existing documents; you could simply use an _update_by_query =>

// replace the {{pipeline_name}} to any valid pipeline
POST blog_pipeline_tips1/_update_by_query?pipeline={{pipeline_name}}

The important point is that a “pipeline” parameter is added and pinpoint to the corresponding main-stream pipeline’s ID.

Bonus: set a default pipeline to indices

Since Elasticsearch 6.5.x, a new index setting named “index.default_pipeline” was introduced. Which simply means all documents ingested would be pre-processed by the default pipeline; for example, adding the “last_update_time” use case is supposed to be run on each new incoming document of an index. The syntax is fairly simple and straight forward:

That’s it for this blog post and hope the tips above ring a bell~

a java / golang / flutter developer, a big data scientist, a father :)